Graph Analytics in Spark

1 Algorithms over graphs

GraphFrame provides the parallel implementation of a set of state of the art algorithms for graph analytics

  • Breadth first search
  • Shortest paths
  • Connected components
  • Strongly connected component
  • Label propagation
  • PageRank

Also custom algorithms can be designed and implemented.

1.1 Checkpoint directory

To run some expensive algorithms, set a checkpoint directory that will store the state of the job at every iteration. This allows to continue where left off if the job crashes. Create such a folder to set the checkpoint directory with:

sc.setCheckpointDir(graphframes_ckpts_dir)
  • graphframes_ckpts_dir is the new checkpoint folder directory
  • sc is the SparkContext object (retrieve it from a SparkSession by using spark.sparkContext)

1.3 Shortest path

The shortest path method selects the length of the shortest path(s) from each vertex to a given set of landmark vertexes. It uses the BFS algorithm for computing the shortest paths.

1.3.1 Implementation

shortestPaths(landmarks)

The shortestPaths(landmarks) method of the GraphFrame class returns the length of the shortest path(s) from each vertex to a given set of landmarks vertexes. For each vertex, one shortest path for each landmark vertex is computed and its length is returned.

  • landmarks: list of IDs of landmark vertexes (e.g., \([u1, u4]\))

shortestPaths() returns a DataFrame that contains one record/row for each distinct vertex of the input graph (also for the non-connected ones). This method is characterized by the following columns

  • one column for each attribute of the vertexes
  • distances (type map): for each landmark lm it contains one pair (ID lm: length shortest path from the vertex of the current record to lm)
  1. Find for each user the length of the shortest path to user \(u1\) (i.e., Alice)
  2. Store the result in a DataFrame

Example graph

Vertex Distance to \(u1\)
\(u1\) 0
\(u2\) -
\(u3\) -
\(u4\) 1
\(u5\) 2
\(u6\) -
\(u7\) -

The content of the returned DataFrame is the following

id name age distances
\(u1\) Alice 34 \([u1 \rightarrow 0]\)
\(u2\) Bob 36 \([\quad]\)
\(u3\) Charlie 30 \([\quad]\)
\(u4\) David 29 \([u1 \rightarrow 1]\)
\(u5\) Esther 32 \([u1 \rightarrow 2]\)
\(u6\) Fanny 36 \([\quad]\)
\(u7\) Gabby 60 \([\quad]\)
  • \([u1 \rightarrow 0]\): data type is map. It stores a set of pairs (Key: Value)
from graphframes import GraphFrame

# Vertex DataFrame
v = spark.createDataFrame(
    [
        ("u1", "Alice", 34),
        ("u2", "Bob", 36),
        ("u3", "Charlie", 30),
        ("u4", "David", 29),
        ("u5", "Esther", 32),
        ("u6", "Fanny", 36),
        ("u7", "Gabby", 60)
    ],
    ["id", "name", "age"]
)

# Edge DataFrame
e = spark.createDataFrame(
    [
        ("u1", "u2", "friend"),
        ("u2", "u3", "follow"),
        ("u3", "u2", "follow"),
        ("u6", "u3", "follow"),
        ("u5", "u6", "follow"),
        ("u5", "u4", "friend"),
        ("u4", "u1", "friend"),
        ("u1", "u5", "friend")
    ],
    ["src", "dst", "relationship"]
)

# Create the graph
g = GraphFrame(v, e)

# Find for each user the length of the shortest path to user u1
shortestPaths = g.shortestPaths(["u1"])
  1. Find for each user the length of the shortest path to users \(u1\) (Alice) and \(u4\) (David)
  2. Store the result in a DataFrame

Example graph

Vertex Distance to \(u1\) Distance to \(u1\)
\(u1\) 0 2
\(u2\) - -
\(u3\) - -
\(u4\) 1 0
\(u5\) 2 1
\(u6\) - -
\(u7\) - -

The content of the returned DataFrame is the following

id name age distances
\(u1\) Alice 34 \([u1 \rightarrow 0, u4 \rightarrow 2]\)
\(u2\) Bob 36 \([\quad]\)
\(u3\) Charlie 30 \([\quad]\)
\(u4\) David 29 \([u1 \rightarrow 1, u4 \rightarrow 0]\)
\(u5\) Esther 32 \([u1 \rightarrow 2, u4 \rightarrow 1]\)
\(u6\) Fanny 36 \([\quad]\)
\(u7\) Gabby 60 \([\quad]\)
  • \([u1 \rightarrow 0]\): data type is map. It stores a set of pairs (Key: Value)
from graphframes import GraphFrame

# Vertex DataFrame
v = spark.createDataFrame(
    [
        ("u1", "Alice", 34),
        ("u2", "Bob", 36),
        ("u3", "Charlie", 30),
        ("u4", "David", 29),
        ("u5", "Esther", 32),
        ("u6", "Fanny", 36),
        ("u7", "Gabby", 60)
    ],
    ["id", "name", "age"]
)

# Edge DataFrame
e = spark.createDataFrame(
    [
        ("u1", "u2", "friend"),
        ("u2", "u3", "follow"),
        ("u3", "u2", "follow"),
        ("u6", "u3", "follow"),
        ("u5", "u6", "follow"),
        ("u5", "u4", "friend"),
        ("u4", "u1", "friend"),
        ("u1", "u5", "friend")
    ],
    ["src", "dst", "relationship"]
)

# Create the graph
g = GraphFrame(v, e)

# Find for each user the length of the shortest paths to users u1 and u4
shortestPaths = g.shortestPaths(["u1", "u4"])

1.4 Connected components

A connected component of a graph is a subgraph \(sg\) such that

  • Any two vertexes in \(sg\) are connected to each other by at least one path
  • The set of vertexes in \(sg\) is not connected to any additional vertexes in the original graph

The direction of edges is not considered.

Two connected components

Three connected components

The connectedComponents() method of the GraphFrame class returns the connected components of the input graph. This is an expensive algorithm, and requires setting a Spark checkpoint directory.

1.4.1 Implementation

connectedComponents()

The connectedComponents() method returns a DataFrame that contains one record/row for each distinct vertex of the input graph. It is characterized by the following columns

  • one column for each attribute of the vertexes
  • component (type long). It is the identifier of the connected component to which the current vertex has been assigned.

Print on the stdout the number of connected components of the following graph

Example graph

Result

Notice that The are two connected components on this graph.

This is the content of the DataFrame used to store the two identified connected components

id name age component
\(u6\) Fanny 36 146028888064
\(u1\) Alice 34 146028888064
\(u3\) Charlie 30 146028888064
\(u5\) Esther 32 146028888064
\(u2\) Bob 36 146028888064
\(u4\) David 29 146028888064
\(u7\) Gabby 60 1546188226560

Notice the “component” field

  • “146028888064”: vertexes of the first component
  • “1546188226560”: vertexes of the second component
from graphframes import GraphFrame

# Vertex DataFrame
v = spark.createDataFrame(
    [
        ("u1", "Alice", 34),
        ("u2", "Bob", 36),
        ("u3", "Charlie", 30),
        ("u4", "David", 29),
        ("u5", "Esther", 32),
        ("u6", "Fanny", 36),
        ("u7", "Gabby", 60)
    ],
    ["id", "name", "age"]
)

# Edge DataFrame
e = spark.createDataFrame(
    [
        ("u1", "u2", "friend"),
        ("u2", "u3", "follow"),
        ("u3", "u2", "follow"),
        ("u6", "u3", "follow"),
        ("u5", "u6", "follow"),
        ("u5", "u4", "friend"),
        ("u4", "u1", "friend"),
        ("u1", "u5", "friend")
    ],
    ["src", "dst", "relationship"]
)

# Create the graph
g = GraphFrame(v, e)

# Set checkpoint folder
sc.setCheckpointDir("tmp_ckpts")

# Run the algorithm
connComp=g.connectedComponents()

# Count the number of components
nComp=connComp.select("component").distinct().count()

print("Number of connected components: ", nComp)

1.5 Strongly connected components

A directed subgraph \(sg\) is called strongly connected if every vertex in \(sg\) is reachable from every other vertex in \(sg\). For undirected graph, connected and strongly connected components are the same.

Three strongly connected subgraphs/components

1.5.1 Implementation

stronglyConnectedComponents()

The stronglyConnectedComponents() method of the GraphFrame class returns the strongly connected components of the input graph. It is an expensive algorithm (better to run it on a cluster with yarn scheduler even with small graphs), and it requires setting a Spark checkpoint directory.

stronglyConnectedComponents() returns a DataFrame that contains one record/row for each distinct vertex of the input graph. It is characterized by the following columns

  • one column for each attribute of the vertexes
  • component (type long). It is the identifier of the strongly connected component to which the current vertex has been assigned.

Print on the stdout the number of strongly connected components of the input graph.

Example graph

Resulting graph

Notice that there are four connected components on this graph.

This is the content of the DataFrame used to store the identified strongly connected components

id name age component
\(u3\) Charlie 30 146028888064
\(u2\) Bob 36 146028888064
\(u1\) Alice 34 498216206336
\(u5\) Esther 32 498216206336
\(u4\) David 29 498216206336
\(u6\) Fanny 36 1090921693184
\(u7\) Gabby 60 1546188226560

Notice the “component” field

  • “146028888064”: vertexes of the first strongly connected component
  • “498216206336”: vertexes of the second strongly connected component
  • “1090921693184”: vertexes of the third strongly connected component
  • “1546188226560”: vertexes of the fourth strongly connected component
from graphframes import GraphFrame

# Vertex DataFrame
v = spark.createDataFrame(
    [
        ("u1", "Alice", 34),
        ("u2", "Bob", 36),
        ("u3", "Charlie", 30),
        ("u4", "David", 29),
        ("u5", "Esther", 32),
        ("u6", "Fanny", 36),
        ("u7", "Gabby", 60)
    ],
    ["id", "name", "age"]
)

# Edge DataFrame
e = spark.createDataFrame(
    [
        ("u1", "u2", "friend"),
        ("u2", "u3", "follow"),
        ("u3", "u2", "follow"),
        ("u6", "u3", "follow"),
        ("u5", "u6", "follow"),
        ("u5", "u4", "friend"),
        ("u4", "u1", "friend"),
        ("u1", "u5", "friend")
    ],
    ["src", "dst", "relationship"]
)

# Create the graph
g = GraphFrame(v, e)

# Set checkpoint folder
sc.setCheckpointDir("tmp_ckpts")

# Run the algorithm
strongConnComp = g.stronglyConnectedComponents(maxIter=10)

# Count the number of strongly connected components
nComp=strongConnComp.select("component").distinct().count()

print("Number of strongly connected components: ", nComp)

1.6 Label propagation

Label Propagation is an algorithm for detecting communities in graphs. It is similar to clustering, but exploits connectivity. Convergence is not guaranteed, and also it is possible to end up with trivial solutions.

1.6.1 The Label Propagation algorithm

Each vertex in the network is initially assigned to its own community: at every step, vertexes send their community affiliation to all neighbors and update their state to the mode community affiliation of incoming messages.

1.6.2 Implementation

labelPropagation(maxIter)

The labelPropagation(maxIter) method of the GraphFrame class runs and returns the result of the label propagation algorithm.

  • maxIter: number of iterations to run

labelPropagation() returns a DataFrame that contains one record/Row for each distinct vertex of the input graph. It is characterized by the following columns

  • one column for each attribute of the vertexes
  • label (type long). It is the identifier of the community to which the current vertex has been assigned.

Split in groups the vertexes of the graph by using the label propagation algorithm.

Example graph

Notice that the result returned by one run of the algorithm. Pay attention that convergence is not guarantee, and different results may come out from different runs.

Results

This is the content of the DataFrame used to store the identified communities

id name age label
\(u3\) Charlie 30 146028888064
\(u4\) David 29 498216206336
\(u1\) Alice 34 498216206336
\(u5\) Esther 32 498216206337
\(u7\) Gabby 60 1546188226560
\(u2\) Bob 36 1606317768704
\(u6\) Fanny 36 1606317768704
  • “146028888064”: vertexes of the first community
  • “498216206336”: vertexes of the second community
  • “1546188226560”: vertexes of the third community
  • “1606317768704”: vertexes of the fourth community
from graphframes import GraphFrame

# Vertex DataFrame
v = spark.createDataFrame(
    [
        ("u1", "Alice", 34),
        ("u2", "Bob", 36),
        ("u3", "Charlie", 30),
        ("u4", "David", 29),
        ("u5", "Esther", 32),
        ("u6", "Fanny", 36),
        ("u7", "Gabby", 60)
    ],
    ["id", "name", "age"]
)

# Edge DataFrame
e = spark.createDataFrame(
    [
        ("u1", "u2", "friend"),
        ("u2", "u3", "follow"),
        ("u3", "u2", "follow"),
        ("u6", "u3", "follow"),
        ("u5", "u6", "follow"),
        ("u5", "u4", "friend"),
        ("u4", "u1", "friend"),
        ("u1", "u5", "friend")
    ],
    ["src", "dst", "relationship"]
)

# Create the graph
g = GraphFrame(v, e)

# Run the label propagation algorithm
labelComm = g.labelPropagation(10)

1.7 PageRank

PageRank is the original famous algorithm used by the Google Search engine to rank vertexes (web pages) in a graph by order of importance. For the Google search engine vertexes are web pages in the World Wide Web, and edges are hyperlinks among web pages. This algorithm assigns a numerical weighting (importance) to each node.

It computes a likelihood that a person randomly clicking on links will arrive at any particular web page. For having a high PageRank, it is important to

  • Have many in-links
  • Be liked by relevant pages (pages characterized by a high PageRank)

The basic idea is that each link vote is proportional to the importance of its source page \(p\): if page \(p\) with importance \(\text{PageRank}(p)\) has \(n\) out-links, each out-link gets \(\frac{\text{PageRank}(p)}{n}\) votes; the importance of page \(p\) is the sum of the votes on its in-links.

1.7.1 Simple recursive formulation

  • Initialize each page rank to \(1.0\): for each \(p\) in pages set \(\textbf{PageRank}(p)\) to \(1.0\)
  • Iterate for \(max\) iterations
    1. Page \(p\) sends a contribution \(\frac{\textbf{PageRank}(p)}{\textbf{numOutLinks}(p)}\) to its neighbors (the pages it links);
    2. Update each page rank \(\textbf{PageRank}(p)\) with the sum of the received contributions.

1.7.2 Random jumps formulation

The PageRank algorithm simulates the “random walk” of a user on the web. Indeed, at each step of the random walk, the random surfer has two options:

  • with probability \(1-\alpha\), follow a link at random among the ones in the current page;

  • with probability \(\alpha\), jump to a random page.

  • Initialize each page rank to \(1.0\): for each \(p\) in pages set \(\textbf{PageRank}(p)\) to \(1.0\)

  • Iterate for max iterations

    1. Page \(p\) sends a contribution \(\frac{\textbf{PageRank}(p)}{\textbf{numOutLinks}(p)}\) to its neighbors (the pages it links);
    2. Update each page rank \(\textbf{PageRank}(p)\) to \(\alpha + (1 - \alpha)\) times the sum of the received contributions.
  • \(\alpha=0.15\)
  • Initialization: \(\forall{p}, \textbf{PageRank}(p) = 1.0\)

Initialization

Figure 1: Iterations

Iteration #1

Iteration #2

Iteration #3

Iteration #4

Iteration #5

Iteration #50

1.7.3 Implementation

pageRank(resetProbability, maxIter, tol, sourceId)

The pageRank() method of the GraphFrame class runs the PageRank algorithm on the input graph.

  • resetProbability: probability of resetting to a random vertex (probability \(\alpha\) associated with random jumps);
  • maxIter: if set, the algorithm is run for a fixed number of iterations; this may not be set if the tol parameter is set;
  • tol: if set, the algorithm is run until the given tolerance; this may not be set if the numIter parameter is set;
  • sourceId: the source vertex for a personalized PageRank (optional parameter).

pageRank() returns a new GraphFrame that contains the same vertexes and edges of the input graph

  • the vertexes of the new graph are characterized by one new attribute, called “pagerank”, that stores the PageRank of the vertexes;
  • the edges of the new graph are characterized by one new attribute, called “weight”, that stores the weight (PageRank contribution) propagated through that edge.

Apply the PageRank algorithm on the following graph and select the user associated with the highest PageRank value.

Example graph

Resulting graph

from graphframes import GraphFrame

# Vertex DataFrame
v = spark.createDataFrame(
    [
        ("u1", "Alice", 34),
        ("u2", "Bob", 36),
        ("u3", "Charlie", 30),
        ("u4", "David", 29),
        ("u5", "Esther", 32),
        ("u6", "Fanny", 36),
        ("u7", "Gabby", 60)
    ],
    ["id", "name", "age"]
)

# Edge DataFrame
e = spark.createDataFrame(
    [
        ("u1", "u2", "friend"),
        ("u2", "u3", "follow"),
        ("u3", "u2", "follow"),
        ("u6", "u3", "follow"),
        ("u5", "u6", "follow"),
        ("u5", "u4", "friend"),
        ("u4", "u1", "friend"),
        ("u1", "u5", "friend")
    ],
    ["src", "dst", "relationship"]
)

# Create the graph
g = GraphFrame(v, e)

# Run the PageRank algorithm
pageRanks = g.pageRank(maxIter=30)

# Select the maximum value of PageRank
maxPageRank = pageRanks.vertices \
    .agg({"pagerank":"max"}) \
    .first()["max(pagerank)"]

# Select the user with the maximum PageRank
pageRanks.vertices \
    .filter(pageRanks.vertices.pagerank==maxPageRank) \
    .show()

1.8 Custom graph algorithms

GraphFrames provides primitives for developing yourself other graph algorithms. It is based on message passing approach: the two key components are:

  • aggregateMessages: it sends messages between vertexes, and aggregate messages for each vertex
  • joins: it joins message aggregates with the original graph

For each user, compute the sum of the ages of adjacent users (count many times the same adjacent user if there are many links).

The resulting table is

Vertex SumAges
\(u1\) 97
\(u2\) 94
\(u3\) 108
\(u4\) 66
\(u5\) 99
\(u6\) 62
from graphframes import GraphFrame

# Vertex DataFrame
v = spark.createDataFrame(
    [
        ("u1", "Alice", 34),
        ("u2", "Bob", 36),
        ("u3", "Charlie", 30),
        ("u4", "David", 29),
        ("u5", "Esther", 32),
        ("u6", "Fanny", 36),
        ("u7", "Gabby", 60)
    ],
    ["id", "name", "age"]
)

# Edge DataFrame
e = spark.createDataFrame(
    [
        ("u1", "u2", "friend"),
        ("u2", "u3", "follow"),
        ("u3", "u2", "follow"),
        ("u6", "u3", "follow"),
        ("u5", "u6", "follow"),
        ("u5", "u4", "friend"),
        ("u4", "u1", "friend"),
        ("u1", "u5", "friend")
    ],
    ["src", "dst", "relationship"]
)

# Create the graph
g = GraphFrame(v, e)

# For each user, sum the ages of the adjacent users
# Send the age of each destination of an edge to its source
msgToSrc = AggregateMessages.dst["age"]

# Send the age of each source of an edge to its destination
msgToDst = AggregateMessages.src["age"]

# Aggregate messages
aggAge = g.aggregateMessages(
    sum(AggregateMessages.msg),
    sendToSrc=msgToSrc,
    sendToDst=msgToDst
)

#Show result
aggAge.show()